Skip to content

Commit

Permalink
perf(rust, python): improve dynamic groupby performance on sorted keys (
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 31, 2023
1 parent c2bfb37 commit 125b6b0
Showing 1 changed file with 109 additions and 51 deletions.
160 changes: 109 additions & 51 deletions polars/polars-time/src/groupby/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,62 +272,120 @@ impl Wrap<&DataFrame> {
.0
.groupby_with_series(by.clone(), true, true)?
.take_groups();
let groups = groups.into_idx();

// include boundaries cannot be parallel (easily)
if include_lower_bound {
POOL.install(|| {
let mut ir = groups
.par_iter()
.map(|base_g| {
let dt = unsafe { dt.take_unchecked(base_g.1.into()) };

let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let (sub_groups, lower, upper) = groupby_windows(
w,
ts,
options.closed_window,
tu,
include_lower_bound,
include_upper_bound,
options.start_by,
);

(lower, upper, update_subgroups_idx(&sub_groups, base_g))
})
.collect::<Vec<_>>();

ir.iter_mut().for_each(|(lower, upper, _)| {
let lower = std::mem::take(lower);
let upper = std::mem::take(upper);
update_bounds(lower, upper)
});

GroupsProxy::Idx(ir.into_iter().flat_map(|(_, _, groups)| groups).collect())
POOL.install(|| match groups {
GroupsProxy::Idx(groups) => {
let mut ir = groups
.par_iter()
.map(|base_g| {
let dt = unsafe { dt.take_unchecked(base_g.1.into()) };

let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let (sub_groups, lower, upper) = groupby_windows(
w,
ts,
options.closed_window,
tu,
include_lower_bound,
include_upper_bound,
options.start_by,
);

(lower, upper, update_subgroups_idx(&sub_groups, base_g))
})
.collect::<Vec<_>>();

ir.iter_mut().for_each(|(lower, upper, _)| {
let lower = std::mem::take(lower);
let upper = std::mem::take(upper);
update_bounds(lower, upper)
});

GroupsProxy::Idx(ir.into_iter().flat_map(|(_, _, groups)| groups).collect())
}
GroupsProxy::Slice { groups, .. } => {
let mut ir = groups
.par_iter()
.map(|base_g| {
let dt = dt.slice(base_g[0] as i64, base_g[1] as usize);
let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let (sub_groups, lower, upper) = groupby_windows(
w,
ts,
options.closed_window,
tu,
include_lower_bound,
include_upper_bound,
options.start_by,
);
(lower, upper, update_subgroups_slice(&sub_groups, *base_g))
})
.collect::<Vec<_>>();

ir.iter_mut().for_each(|(lower, upper, _)| {
let lower = std::mem::take(lower);
let upper = std::mem::take(upper);
update_bounds(lower, upper)
});

GroupsProxy::Slice {
groups: ir.into_iter().flat_map(|(_, _, groups)| groups).collect(),
rolling: false,
}
}
})
} else {
let groupsidx = POOL.install(|| {
groups
.par_iter()
.flat_map(|base_g| {
let dt = unsafe { dt.take_unchecked(base_g.1.into()) };
let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let (sub_groups, _, _) = groupby_windows(
w,
ts,
options.closed_window,
tu,
include_lower_bound,
include_upper_bound,
options.start_by,
);
update_subgroups_idx(&sub_groups, base_g)
})
.collect()
});
GroupsProxy::Idx(groupsidx)
POOL.install(|| match groups {
GroupsProxy::Idx(groups) => {
let groupsidx = groups
.par_iter()
.flat_map(|base_g| {
let dt = unsafe { dt.take_unchecked(base_g.1.into()) };
let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let (sub_groups, _, _) = groupby_windows(
w,
ts,
options.closed_window,
tu,
include_lower_bound,
include_upper_bound,
options.start_by,
);
update_subgroups_idx(&sub_groups, base_g)
})
.collect();
GroupsProxy::Idx(groupsidx)
}
GroupsProxy::Slice { groups, .. } => {
let groups = groups
.par_iter()
.flat_map(|base_g| {
let dt = dt.slice(base_g[0] as i64, base_g[1] as usize);
let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let (sub_groups, _, _) = groupby_windows(
w,
ts,
options.closed_window,
tu,
include_lower_bound,
include_upper_bound,
options.start_by,
);
update_subgroups_slice(&sub_groups, *base_g)
})
.collect();
GroupsProxy::Slice {
groups,
rolling: false,
}
}
})
}
};

Expand Down

0 comments on commit 125b6b0

Please sign in to comment.