Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk-wise agg: add GatherMerge above Sort #7547

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 69 additions & 60 deletions tsl/src/chunkwise_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI
copy_append_like_path(root,
partially_compressed_append,
partially_compressed_sorted,
subpath->pathtarget));
partial_grouping_target));
}

if (can_hash)
Expand All @@ -446,7 +446,7 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI
copy_append_like_path(root,
partially_compressed_append,
partially_compressed_hashed,
subpath->pathtarget));
partial_grouping_target));
}
}
else
Expand Down Expand Up @@ -512,23 +512,6 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI
hashed_subpaths,
partial_grouping_target));
}

/* Finish the partial paths (just added by add_partial_path to partially_grouped_rel in this
* function) by adding a gather node and add this path to the partially_grouped_rel using
* add_path). */
foreach (lc, partially_grouped_rel->partial_pathlist)
{
Path *append_path = lfirst(lc);
double total_groups = append_path->rows * append_path->parallel_workers;

Path *gather_path = (Path *) create_gather_path(root,
partially_grouped_rel,
append_path,
partially_grouped_rel->reltarget,
NULL,
&total_groups);
add_path(partially_grouped_rel, (Path *) gather_path);
}
}
}

Expand Down Expand Up @@ -724,65 +707,91 @@ tsl_pushdown_partial_agg(PlannerInfo *root, Hypertable *ht, RelOptInfo *input_re
}

/* Replan aggregation if we were able to generate partially grouped rel paths */
if (partially_grouped_rel->pathlist == NIL)
List *partially_grouped_paths =
list_concat(partially_grouped_rel->pathlist, partially_grouped_rel->partial_pathlist);
if (partially_grouped_paths == NIL)
return;

/* Prefer our paths */
output_rel->pathlist = NIL;
output_rel->partial_pathlist = NIL;

/* Finalize the created partially aggregated paths by adding a 'Finalize Aggregate' node on top
* of them. */
/*
* Finalize the created partially aggregated paths by adding a
* 'Finalize Aggregate' node on top of them, and adding Sort and Gather
* nodes as required.
*/
AggClauseCosts *agg_final_costs = &extra_data->agg_final_costs;
foreach (lc, partially_grouped_rel->pathlist)
foreach (lc, partially_grouped_paths)
{
Path *append_path = lfirst(lc);

if (contains_path_plain_or_sorted_agg(append_path))
Path *partially_aggregated_path = lfirst(lc);
AggStrategy final_strategy;
if (contains_path_plain_or_sorted_agg(partially_aggregated_path))
{
bool is_sorted;

is_sorted = pathkeys_contained_in(root->group_pathkeys, append_path->pathkeys);

const bool is_sorted =
pathkeys_contained_in(root->group_pathkeys, partially_aggregated_path->pathkeys);
if (!is_sorted)
{
append_path = (Path *)
create_sort_path(root, output_rel, append_path, root->group_pathkeys, -1.0);
partially_aggregated_path = (Path *) create_sort_path(root,
output_rel,
partially_aggregated_path,
root->group_pathkeys,
-1.0);
}

add_path(output_rel,
(Path *) create_agg_path(root,
output_rel,
append_path,
grouping_target,
parse->groupClause ? AGG_SORTED : AGG_PLAIN,
AGGSPLIT_FINAL_DESERIAL,
#if PG16_LT
parse->groupClause,
#else
root->processed_groupClause,
#endif
(List *) parse->havingQual,
agg_final_costs,
d_num_groups));
final_strategy = parse->groupClause ? AGG_SORTED : AGG_PLAIN;
}
else
{
add_path(output_rel,
(Path *) create_agg_path(root,
output_rel,
append_path,
grouping_target,
AGG_HASHED,
AGGSPLIT_FINAL_DESERIAL,
final_strategy = AGG_HASHED;
}

/*
* We have to add a Gather or Gather Merge on top of parallel plans. It
* goes above the Sort we might have added just before, so that the Sort
* is parallelized as well.
*/
if (partially_aggregated_path->parallel_workers > 0)
{
double total_groups =
partially_aggregated_path->rows * partially_aggregated_path->parallel_workers;
if (partially_aggregated_path->pathkeys == NIL)
{
partially_aggregated_path =
(Path *) create_gather_path(root,
partially_grouped_rel,
partially_aggregated_path,
partially_grouped_rel->reltarget,
/* required_outer = */ NULL,
&total_groups);
}
else
{
partially_aggregated_path =
(Path *) create_gather_merge_path(root,
partially_grouped_rel,
partially_aggregated_path,
partially_grouped_rel->reltarget,
partially_aggregated_path->pathkeys,
/* required_outer = */ NULL,
&total_groups);
}
}

add_path(output_rel,
(Path *) create_agg_path(root,
output_rel,
partially_aggregated_path,
grouping_target,
final_strategy,
AGGSPLIT_FINAL_DESERIAL,
#if PG16_LT
parse->groupClause,
parse->groupClause,
#else
root->processed_groupClause,
root->processed_groupClause,
#endif
(List *) parse->havingQual,
agg_final_costs,
d_num_groups));
}
(List *) parse->havingQual,
agg_final_costs,
d_num_groups));
}
}
61 changes: 61 additions & 0 deletions tsl/test/shared/expected/chunkwise_agg_gather_sort.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
-- Exercise the GatherMerge -> Sort -> Append -> Partial Aggregate plan.
\set prefix 'explain (costs off, timing off, summary off)'
set parallel_setup_cost = 0;
set parallel_tuple_cost = 0;
set max_parallel_workers_per_gather = 2;
set parallel_leader_participation = off;
set enable_hashagg to off;
:prefix
select count(*) from metrics group by v0;
QUERY PLAN
Finalize GroupAggregate
Group Key: _hyper_X_X_chunk.v0
-> Gather Merge
Workers Planned: 2
-> Sort
Sort Key: _hyper_X_X_chunk.v0
-> Parallel Append
-> Partial GroupAggregate
Group Key: _hyper_X_X_chunk.v0
-> Sort
Sort Key: _hyper_X_X_chunk.v0
-> Seq Scan on _hyper_X_X_chunk
-> Partial GroupAggregate
Group Key: _hyper_X_X_chunk.v0
-> Sort
Sort Key: _hyper_X_X_chunk.v0
-> Seq Scan on _hyper_X_X_chunk
-> Partial GroupAggregate
Group Key: _hyper_X_X_chunk.v0
-> Sort
Sort Key: _hyper_X_X_chunk.v0
-> Seq Scan on _hyper_X_X_chunk
(22 rows)

reset enable_hashagg;
:prefix
select count(*) from metrics group by v0;
QUERY PLAN
Finalize HashAggregate
Group Key: _hyper_X_X_chunk.v0
-> Gather
Workers Planned: 2
-> Parallel Append
-> Partial HashAggregate
Group Key: _hyper_X_X_chunk.v0
-> Seq Scan on _hyper_X_X_chunk
-> Partial HashAggregate
Group Key: _hyper_X_X_chunk.v0
-> Seq Scan on _hyper_X_X_chunk
-> Partial HashAggregate
Group Key: _hyper_X_X_chunk.v0
-> Seq Scan on _hyper_X_X_chunk
(14 rows)

reset parallel_setup_cost;
reset parallel_tuple_cost;
reset max_parallel_workers_per_gather;
reset parallel_leader_participation;
1 change: 1 addition & 0 deletions tsl/test/shared/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
set(TEST_FILES_SHARED
cagg_compression.sql
chunkwise_agg_gather_sort.sql
classify_relation.sql
compat.sql
constify_timestamptz_op_interval.sql
Expand Down
27 changes: 27 additions & 0 deletions tsl/test/shared/sql/chunkwise_agg_gather_sort.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.

-- Exercise the GatherMerge -> Sort -> Append -> Partial Aggregate plan.

\set prefix 'explain (costs off, timing off, summary off)'

set parallel_setup_cost = 0;
set parallel_tuple_cost = 0;
set max_parallel_workers_per_gather = 2;
set parallel_leader_participation = off;

set enable_hashagg to off;

:prefix
select count(*) from metrics group by v0;

reset enable_hashagg;

:prefix
select count(*) from metrics group by v0;

reset parallel_setup_cost;
reset parallel_tuple_cost;
reset max_parallel_workers_per_gather;
reset parallel_leader_participation;
Loading