diff --git a/polars/polars-time/src/groupby/dynamic.rs b/polars/polars-time/src/groupby/dynamic.rs index ad4eb11a2230..60b7083d2bd4 100644 --- a/polars/polars-time/src/groupby/dynamic.rs +++ b/polars/polars-time/src/groupby/dynamic.rs @@ -272,62 +272,120 @@ impl Wrap<&DataFrame> { .0 .groupby_with_series(by.clone(), true, true)? .take_groups(); - let groups = groups.into_idx(); // include boundaries cannot be parallel (easily) if include_lower_bound { - POOL.install(|| { - let mut ir = groups - .par_iter() - .map(|base_g| { - let dt = unsafe { dt.take_unchecked(base_g.1.into()) }; - - let vals = dt.downcast_iter().next().unwrap(); - let ts = vals.values().as_slice(); - let (sub_groups, lower, upper) = groupby_windows( - w, - ts, - options.closed_window, - tu, - include_lower_bound, - include_upper_bound, - options.start_by, - ); - - (lower, upper, update_subgroups_idx(&sub_groups, base_g)) - }) - .collect::>(); - - ir.iter_mut().for_each(|(lower, upper, _)| { - let lower = std::mem::take(lower); - let upper = std::mem::take(upper); - update_bounds(lower, upper) - }); - - GroupsProxy::Idx(ir.into_iter().flat_map(|(_, _, groups)| groups).collect()) + POOL.install(|| match groups { + GroupsProxy::Idx(groups) => { + let mut ir = groups + .par_iter() + .map(|base_g| { + let dt = unsafe { dt.take_unchecked(base_g.1.into()) }; + + let vals = dt.downcast_iter().next().unwrap(); + let ts = vals.values().as_slice(); + let (sub_groups, lower, upper) = groupby_windows( + w, + ts, + options.closed_window, + tu, + include_lower_bound, + include_upper_bound, + options.start_by, + ); + + (lower, upper, update_subgroups_idx(&sub_groups, base_g)) + }) + .collect::>(); + + ir.iter_mut().for_each(|(lower, upper, _)| { + let lower = std::mem::take(lower); + let upper = std::mem::take(upper); + update_bounds(lower, upper) + }); + + GroupsProxy::Idx(ir.into_iter().flat_map(|(_, _, groups)| groups).collect()) + } + GroupsProxy::Slice { groups, .. } => { + let mut ir = groups + .par_iter() + .map(|base_g| { + let dt = dt.slice(base_g[0] as i64, base_g[1] as usize); + let vals = dt.downcast_iter().next().unwrap(); + let ts = vals.values().as_slice(); + let (sub_groups, lower, upper) = groupby_windows( + w, + ts, + options.closed_window, + tu, + include_lower_bound, + include_upper_bound, + options.start_by, + ); + (lower, upper, update_subgroups_slice(&sub_groups, *base_g)) + }) + .collect::>(); + + ir.iter_mut().for_each(|(lower, upper, _)| { + let lower = std::mem::take(lower); + let upper = std::mem::take(upper); + update_bounds(lower, upper) + }); + + GroupsProxy::Slice { + groups: ir.into_iter().flat_map(|(_, _, groups)| groups).collect(), + rolling: false, + } + } }) } else { - let groupsidx = POOL.install(|| { - groups - .par_iter() - .flat_map(|base_g| { - let dt = unsafe { dt.take_unchecked(base_g.1.into()) }; - let vals = dt.downcast_iter().next().unwrap(); - let ts = vals.values().as_slice(); - let (sub_groups, _, _) = groupby_windows( - w, - ts, - options.closed_window, - tu, - include_lower_bound, - include_upper_bound, - options.start_by, - ); - update_subgroups_idx(&sub_groups, base_g) - }) - .collect() - }); - GroupsProxy::Idx(groupsidx) + POOL.install(|| match groups { + GroupsProxy::Idx(groups) => { + let groupsidx = groups + .par_iter() + .flat_map(|base_g| { + let dt = unsafe { dt.take_unchecked(base_g.1.into()) }; + let vals = dt.downcast_iter().next().unwrap(); + let ts = vals.values().as_slice(); + let (sub_groups, _, _) = groupby_windows( + w, + ts, + options.closed_window, + tu, + include_lower_bound, + include_upper_bound, + options.start_by, + ); + update_subgroups_idx(&sub_groups, base_g) + }) + .collect(); + GroupsProxy::Idx(groupsidx) + } + GroupsProxy::Slice { groups, .. } => { + let groups = groups + .par_iter() + .flat_map(|base_g| { + let dt = dt.slice(base_g[0] as i64, base_g[1] as usize); + let vals = dt.downcast_iter().next().unwrap(); + let ts = vals.values().as_slice(); + let (sub_groups, _, _) = groupby_windows( + w, + ts, + options.closed_window, + tu, + include_lower_bound, + include_upper_bound, + options.start_by, + ); + update_subgroups_slice(&sub_groups, *base_g) + }) + .collect(); + GroupsProxy::Slice { + groups, + rolling: false, + } + } + }) } };