From 92902e68e0d047fa44ef82ad0f5473e8f7b04304 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 12 Apr 2024 17:02:51 +0200 Subject: [PATCH] refactor: Don't run streaming group-by in partitionable gb (#15611) --- .../executors/group_by_partitioned.rs | 57 ------------------- py-polars/tests/unit/test_queries.py | 5 +- 2 files changed, 1 insertion(+), 61 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs b/crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs index 3a158ea81b5e..ad654b22648e 100644 --- a/crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs +++ b/crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs @@ -3,8 +3,6 @@ use polars_core::utils::{accumulate_dataframes_vertical, split_df}; use rayon::prelude::*; use super::*; -#[cfg(feature = "streaming")] -use crate::physical_plan::planner::create_physical_plan; /// Take an input Executor and a multiple expressions pub struct PartitionGroupByExec { @@ -249,54 +247,6 @@ fn can_run_partitioned( } impl PartitionGroupByExec { - #[cfg(feature = "streaming")] - fn run_streaming( - &mut self, - state: &mut ExecutionState, - original_df: DataFrame, - ) -> Option> { - #[allow(clippy::needless_update)] - let group_by_options = GroupbyOptions { - slice: self.slice, - ..Default::default() - } - .into(); - let lp = LogicalPlan::GroupBy { - input: Arc::new(original_df.lazy().logical_plan), - keys: Arc::new(std::mem::take(&mut self.keys)), - aggs: std::mem::take(&mut self.aggs), - schema: self.output_schema.clone(), - apply: None, - maintain_order: false, - options: group_by_options, - }; - let mut expr_arena = Default::default(); - let mut lp_arena = Default::default(); - let node = to_alp(lp, &mut expr_arena, &mut lp_arena).unwrap(); - - let inserted = streaming::insert_streaming_nodes( - node, - &mut lp_arena, - &mut expr_arena, - &mut vec![], - false, - false, - true, - ) - .unwrap(); - - if inserted { - let mut phys_plan = create_physical_plan(node, &mut lp_arena, &mut expr_arena).unwrap(); - - if state.verbose() { - eprintln!("run STREAMING HASH AGGREGATION") - } - Some(phys_plan.execute(state)) - } else { - None - } - } - fn execute_impl( &mut self, state: &mut ExecutionState, @@ -321,13 +271,6 @@ impl PartitionGroupByExec { ); } - #[cfg(feature = "streaming")] - if !self.maintain_order && std::env::var("POLARS_NO_STREAMING_GROUPBY").is_err() { - if let Some(out) = self.run_streaming(state, original_df.clone()) { - return out; - } - } - if state.verbose() { eprintln!("run PARTITIONED HASH AGGREGATION") } diff --git a/py-polars/tests/unit/test_queries.py b/py-polars/tests/unit/test_queries.py index 83bf5f1b1735..4ca3851a09d5 100644 --- a/py-polars/tests/unit/test_queries.py +++ b/py-polars/tests/unit/test_queries.py @@ -1,7 +1,6 @@ from __future__ import annotations from datetime import date, datetime, time, timedelta -from typing import Any import numpy as np import pandas as pd @@ -115,9 +114,7 @@ def test_maintain_order_after_sampling() -> None: assert result.to_dict(as_series=False) == expected -def test_sorted_group_by_optimization(monkeypatch: Any) -> None: - monkeypatch.setenv("POLARS_NO_STREAMING_GROUPBY", "1") - +def test_sorted_group_by_optimization() -> None: df = pl.DataFrame({"a": np.random.randint(0, 5, 20)}) # the sorted optimization should not randomize the