diff --git a/tsl/src/chunkwise_agg.c b/tsl/src/chunkwise_agg.c index 5d319e5f4b2..46a51b2d344 100644 --- a/tsl/src/chunkwise_agg.c +++ b/tsl/src/chunkwise_agg.c @@ -437,7 +437,7 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI copy_append_like_path(root, partially_compressed_append, partially_compressed_sorted, - subpath->pathtarget)); + partial_grouping_target)); } if (can_hash) @@ -446,7 +446,7 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI copy_append_like_path(root, partially_compressed_append, partially_compressed_hashed, - subpath->pathtarget)); + partial_grouping_target)); } } else @@ -512,23 +512,6 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI hashed_subpaths, partial_grouping_target)); } - - /* Finish the partial paths (just added by add_partial_path to partially_grouped_rel in this - * function) by adding a gather node and add this path to the partially_grouped_rel using - * add_path). */ - foreach (lc, partially_grouped_rel->partial_pathlist) - { - Path *append_path = lfirst(lc); - double total_groups = append_path->rows * append_path->parallel_workers; - - Path *gather_path = (Path *) create_gather_path(root, - partially_grouped_rel, - append_path, - partially_grouped_rel->reltarget, - NULL, - &total_groups); - add_path(partially_grouped_rel, (Path *) gather_path); - } } } @@ -724,65 +707,91 @@ tsl_pushdown_partial_agg(PlannerInfo *root, Hypertable *ht, RelOptInfo *input_re } /* Replan aggregation if we were able to generate partially grouped rel paths */ - if (partially_grouped_rel->pathlist == NIL) + List *partially_grouped_paths = + list_concat(partially_grouped_rel->pathlist, partially_grouped_rel->partial_pathlist); + if (partially_grouped_paths == NIL) return; /* Prefer our paths */ output_rel->pathlist = NIL; output_rel->partial_pathlist = NIL; - /* Finalize the created partially aggregated paths by adding a 'Finalize Aggregate' node on top - * of them. */ + /* + * Finalize the created partially aggregated paths by adding a + * 'Finalize Aggregate' node on top of them, and adding Sort and Gather + * nodes as required. + */ AggClauseCosts *agg_final_costs = &extra_data->agg_final_costs; - foreach (lc, partially_grouped_rel->pathlist) + foreach (lc, partially_grouped_paths) { - Path *append_path = lfirst(lc); - - if (contains_path_plain_or_sorted_agg(append_path)) + Path *partially_aggregated_path = lfirst(lc); + AggStrategy final_strategy; + if (contains_path_plain_or_sorted_agg(partially_aggregated_path)) { - bool is_sorted; - - is_sorted = pathkeys_contained_in(root->group_pathkeys, append_path->pathkeys); - + const bool is_sorted = + pathkeys_contained_in(root->group_pathkeys, partially_aggregated_path->pathkeys); if (!is_sorted) { - append_path = (Path *) - create_sort_path(root, output_rel, append_path, root->group_pathkeys, -1.0); + partially_aggregated_path = (Path *) create_sort_path(root, + output_rel, + partially_aggregated_path, + root->group_pathkeys, + -1.0); } - add_path(output_rel, - (Path *) create_agg_path(root, - output_rel, - append_path, - grouping_target, - parse->groupClause ? AGG_SORTED : AGG_PLAIN, - AGGSPLIT_FINAL_DESERIAL, -#if PG16_LT - parse->groupClause, -#else - root->processed_groupClause, -#endif - (List *) parse->havingQual, - agg_final_costs, - d_num_groups)); + final_strategy = parse->groupClause ? AGG_SORTED : AGG_PLAIN; } else { - add_path(output_rel, - (Path *) create_agg_path(root, - output_rel, - append_path, - grouping_target, - AGG_HASHED, - AGGSPLIT_FINAL_DESERIAL, + final_strategy = AGG_HASHED; + } + + /* + * We have to add a Gather or Gather Merge on top of parallel plans. It + * goes above the Sort we might have added just before, so that the Sort + * is parallelized as well. + */ + if (partially_aggregated_path->parallel_workers > 0) + { + double total_groups = + partially_aggregated_path->rows * partially_aggregated_path->parallel_workers; + if (partially_aggregated_path->pathkeys == NIL) + { + partially_aggregated_path = + (Path *) create_gather_path(root, + partially_grouped_rel, + partially_aggregated_path, + partially_grouped_rel->reltarget, + /* required_outer = */ NULL, + &total_groups); + } + else + { + partially_aggregated_path = + (Path *) create_gather_merge_path(root, + partially_grouped_rel, + partially_aggregated_path, + partially_grouped_rel->reltarget, + partially_aggregated_path->pathkeys, + /* required_outer = */ NULL, + &total_groups); + } + } + + add_path(output_rel, + (Path *) create_agg_path(root, + output_rel, + partially_aggregated_path, + grouping_target, + final_strategy, + AGGSPLIT_FINAL_DESERIAL, #if PG16_LT - parse->groupClause, + parse->groupClause, #else - root->processed_groupClause, + root->processed_groupClause, #endif - (List *) parse->havingQual, - agg_final_costs, - d_num_groups)); - } + (List *) parse->havingQual, + agg_final_costs, + d_num_groups)); } } diff --git a/tsl/test/shared/expected/chunkwise_agg_gather_sort.out b/tsl/test/shared/expected/chunkwise_agg_gather_sort.out new file mode 100644 index 00000000000..9e4c3ce735d --- /dev/null +++ b/tsl/test/shared/expected/chunkwise_agg_gather_sort.out @@ -0,0 +1,61 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +-- Exercise the GatherMerge -> Sort -> Append -> Partial Aggregate plan. +\set prefix 'explain (costs off, timing off, summary off)' +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set parallel_leader_participation = off; +set enable_hashagg to off; +:prefix +select count(*) from metrics group by v0; +QUERY PLAN + Finalize GroupAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Gather Merge + Workers Planned: 2 + -> Sort + Sort Key: _hyper_X_X_chunk.v0 + -> Parallel Append + -> Partial GroupAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Sort + Sort Key: _hyper_X_X_chunk.v0 + -> Seq Scan on _hyper_X_X_chunk + -> Partial GroupAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Sort + Sort Key: _hyper_X_X_chunk.v0 + -> Seq Scan on _hyper_X_X_chunk + -> Partial GroupAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Sort + Sort Key: _hyper_X_X_chunk.v0 + -> Seq Scan on _hyper_X_X_chunk +(22 rows) + +reset enable_hashagg; +:prefix +select count(*) from metrics group by v0; +QUERY PLAN + Finalize HashAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Gather + Workers Planned: 2 + -> Parallel Append + -> Partial HashAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Seq Scan on _hyper_X_X_chunk + -> Partial HashAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Seq Scan on _hyper_X_X_chunk + -> Partial HashAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Seq Scan on _hyper_X_X_chunk +(14 rows) + +reset parallel_setup_cost; +reset parallel_tuple_cost; +reset max_parallel_workers_per_gather; +reset parallel_leader_participation; diff --git a/tsl/test/shared/sql/CMakeLists.txt b/tsl/test/shared/sql/CMakeLists.txt index 74ddb376c9c..109295d882b 100644 --- a/tsl/test/shared/sql/CMakeLists.txt +++ b/tsl/test/shared/sql/CMakeLists.txt @@ -1,5 +1,6 @@ set(TEST_FILES_SHARED cagg_compression.sql + chunkwise_agg_gather_sort.sql classify_relation.sql compat.sql constify_timestamptz_op_interval.sql diff --git a/tsl/test/shared/sql/chunkwise_agg_gather_sort.sql b/tsl/test/shared/sql/chunkwise_agg_gather_sort.sql new file mode 100644 index 00000000000..39f062f73e9 --- /dev/null +++ b/tsl/test/shared/sql/chunkwise_agg_gather_sort.sql @@ -0,0 +1,27 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +-- Exercise the GatherMerge -> Sort -> Append -> Partial Aggregate plan. + +\set prefix 'explain (costs off, timing off, summary off)' + +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set parallel_leader_participation = off; + +set enable_hashagg to off; + +:prefix +select count(*) from metrics group by v0; + +reset enable_hashagg; + +:prefix +select count(*) from metrics group by v0; + +reset parallel_setup_cost; +reset parallel_tuple_cost; +reset max_parallel_workers_per_gather; +reset parallel_leader_participation;