Skip to content

Commit

Permalink
Support chunkwise aggregation with projection (timescale#7049)
Browse files Browse the repository at this point in the history
This is important for the common case of grouping by time_bucket(). In
this case, under AggPath there is a ProjectionPath above the Append node
for all the chunks. When we make a chunkwise aggregation plan, we have
to put a similar ProjectionPath under each per-chunk AggPath.
  • Loading branch information
akuzm authored Sep 5, 2024
1 parent f61e77c commit d2a2498
Show file tree
Hide file tree
Showing 30 changed files with 1,056 additions and 6,283 deletions.
9 changes: 9 additions & 0 deletions test/sql/updates/setup.continuous_aggs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
-- on timescaledb 1.7.x
CALL _timescaledb_testing.stop_workers();

-- disable chunkwise aggregation and hash aggregation, because it might lead to
-- different order of chunk creation in the cagg table, based on the underlying
-- aggregation plan.
SET timescaledb.enable_chunkwise_aggregation TO OFF;
SET enable_hashagg TO OFF;

CREATE TYPE custom_type AS (high int, low int);

CREATE TABLE conditions_before (
Expand Down Expand Up @@ -292,3 +298,6 @@ SELECT add_continuous_aggregate_policy('mat_drop', '7 days', '-30 days'::interva
CALL refresh_continuous_aggregate('mat_drop',NULL,NULL);

SELECT drop_chunks('drop_test', NOW() - INTERVAL '7 days');

RESET timescaledb.enable_chunkwise_aggregation;
RESET enable_hashagg;
102 changes: 81 additions & 21 deletions tsl/src/chunkwise_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ get_subpaths_from_append_path(Path *path, bool handle_gather_path)
{
return get_subpaths_from_append_path(castNode(GatherPath, path)->subpath, false);
}
else if (IsA(path, ProjectionPath))
{
return get_subpaths_from_append_path(castNode(ProjectionPath, path)->subpath, false);
}

/* Aggregation push-down is not supported for other path types so far */
return NIL;
Expand Down Expand Up @@ -142,6 +146,17 @@ copy_append_like_path(PlannerInfo *root, Path *path, List *new_subpaths, PathTar
ts_chunk_append_path_copy(chunk_append_path, new_subpaths, pathtarget);
return &new_chunk_append_path->cpath.path;
}
else if (IsA(path, ProjectionPath))
{
/*
* Projection goes under partial aggregation, so here we can just ignore
* it.
*/
return copy_append_like_path(root,
castNode(ProjectionPath, path)->subpath,
new_subpaths,
pathtarget);
}

/* Should never happen, already checked by caller */
Ensure(false, "unknown path type");
Expand Down Expand Up @@ -216,33 +231,74 @@ create_hashed_partial_agg_path(PlannerInfo *root, Path *path, PathTarget *target
* Add partially aggregated subpath
*/
static void
add_partially_aggregated_subpaths(PlannerInfo *root, Path *parent_path,
add_partially_aggregated_subpaths(PlannerInfo *root, PathTarget *input_target,
PathTarget *partial_grouping_target, double d_num_groups,
GroupPathExtraData *extra_data, bool can_sort, bool can_hash,
Path *subpath, List **sorted_paths, List **hashed_paths)
{
/* Translate targetlist for partition */
AppendRelInfo *appinfo = ts_get_appendrelinfo(root, subpath->parent->relid, false);
PathTarget *chunktarget = copy_pathtarget(partial_grouping_target);
chunktarget->exprs =
castNode(List, adjust_appendrel_attrs(root, (Node *) chunktarget->exprs, 1, &appinfo));
PathTarget *chunk_grouped_target = copy_pathtarget(partial_grouping_target);
chunk_grouped_target->exprs =
castNode(List,
adjust_appendrel_attrs(root,
(Node *) chunk_grouped_target->exprs,
/* nappinfos = */ 1,
&appinfo));

/* In declarative partitioning planning, this is done by appy_scanjoin_target_to_path */
Assert(list_length(subpath->pathtarget->exprs) == list_length(parent_path->pathtarget->exprs));
subpath->pathtarget->sortgrouprefs = parent_path->pathtarget->sortgrouprefs;
/*
* We might have to project before aggregation. In declarative partitioning
* planning, the projection is applied by apply_scanjoin_target_to_path().
*/
PathTarget *chunk_target_before_grouping = copy_pathtarget(input_target);
chunk_target_before_grouping->exprs =
castNode(List,
adjust_appendrel_attrs(root,
(Node *) chunk_target_before_grouping->exprs,
/* nappinfos = */ 1,
&appinfo));
/*
* Note that we cannot use apply_projection_to_path() here, because it might
* modify the targetlist of the projection-capable paths in place, which
* would cause a mismatch when these paths are used in another context.
*
* In case of DecompressChunk path, we can make a copy of it and push the
* projection down to it.
*
* In general, the projection here arises because the pathtarget of the
* table scans is determined early based on the reltarget which lists all
* used columns in attno order, and the pathtarget before grouping is
* computed later and has the grouping columns in front.
*/
if (ts_is_decompress_chunk_path(subpath))
{
subpath = (Path *) copy_decompress_chunk_path((DecompressChunkPath *) subpath);
subpath->pathtarget = chunk_target_before_grouping;
}
else
{
subpath = (Path *)
create_projection_path(root, subpath->parent, subpath, chunk_target_before_grouping);
}

if (can_sort)
{
AggPath *agg_path =
create_sorted_partial_agg_path(root, subpath, chunktarget, d_num_groups, extra_data);
AggPath *agg_path = create_sorted_partial_agg_path(root,
subpath,
chunk_grouped_target,
d_num_groups,
extra_data);

*sorted_paths = lappend(*sorted_paths, (Path *) agg_path);
}

if (can_hash)
{
AggPath *agg_path =
create_hashed_partial_agg_path(root, subpath, chunktarget, d_num_groups, extra_data);
AggPath *agg_path = create_hashed_partial_agg_path(root,
subpath,
chunk_grouped_target,
d_num_groups,
extra_data);

*hashed_paths = lappend(*hashed_paths, (Path *) agg_path);
}
Expand All @@ -256,10 +312,11 @@ add_partially_aggregated_subpaths(PlannerInfo *root, Path *parent_path,
* AGGSPLIT_FINAL_DESERIAL step.
*/
static void
generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptInfo *output_rel,
RelOptInfo *partially_grouped_rel, PathTarget *grouping_target,
PathTarget *partial_grouping_target, bool can_sort, bool can_hash,
double d_num_groups, GroupPathExtraData *extra_data)
generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptInfo *input_rel,
RelOptInfo *output_rel, RelOptInfo *partially_grouped_rel,
PathTarget *grouping_target, PathTarget *partial_grouping_target,
bool can_sort, bool can_hash, double d_num_groups,
GroupPathExtraData *extra_data)
{
/* Get subpaths */
List *subpaths = get_subpaths_from_append_path(cheapest_total_path, false);
Expand Down Expand Up @@ -306,7 +363,7 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI
Path *subsubpath = lfirst(lc2);

add_partially_aggregated_subpaths(root,
cheapest_total_path,
input_rel->reltarget,
partial_grouping_target,
d_num_groups,
extra_data,
Expand Down Expand Up @@ -338,7 +395,7 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI
else
{
add_partially_aggregated_subpaths(root,
cheapest_total_path,
input_rel->reltarget,
partial_grouping_target,
d_num_groups,
extra_data,
Expand Down Expand Up @@ -384,9 +441,10 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI
*/
static void
generate_partial_agg_pushdown_path(PlannerInfo *root, Path *cheapest_partial_path,
RelOptInfo *output_rel, RelOptInfo *partially_grouped_rel,
PathTarget *grouping_target, PathTarget *partial_grouping_target,
bool can_sort, bool can_hash, double d_num_groups,
RelOptInfo *input_rel, RelOptInfo *output_rel,
RelOptInfo *partially_grouped_rel, PathTarget *grouping_target,
PathTarget *partial_grouping_target, bool can_sort,
bool can_hash, double d_num_groups,
GroupPathExtraData *extra_data)
{
/* Get subpaths */
Expand Down Expand Up @@ -420,7 +478,7 @@ generate_partial_agg_pushdown_path(PlannerInfo *root, Path *cheapest_partial_pat
Assert(get_subpaths_from_append_path(subpath, false) == NIL);

add_partially_aggregated_subpaths(root,
cheapest_partial_path,
input_rel->reltarget,
partial_grouping_target,
d_num_groups,
extra_data,
Expand Down Expand Up @@ -645,6 +703,7 @@ tsl_pushdown_partial_agg(PlannerInfo *root, Hypertable *ht, RelOptInfo *input_re
Assert(cheapest_total_path != NULL);
generate_agg_pushdown_path(root,
cheapest_total_path,
input_rel,
output_rel,
partially_grouped_rel,
grouping_target,
Expand All @@ -660,6 +719,7 @@ tsl_pushdown_partial_agg(PlannerInfo *root, Hypertable *ht, RelOptInfo *input_re
Path *cheapest_partial_path = linitial(input_rel->partial_pathlist);
generate_partial_agg_pushdown_path(root,
cheapest_partial_path,
input_rel,
output_rel,
partially_grouped_rel,
grouping_target,
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu
sort_info->required_compressed_pathkeys = required_compressed_pathkeys;
}

static DecompressChunkPath *
DecompressChunkPath *
copy_decompress_chunk_path(DecompressChunkPath *src)
{
DecompressChunkPath *dst = palloc(sizeof(DecompressChunkPath));
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,5 @@ void ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *rel, Hype
Chunk *chunk);

extern bool ts_is_decompress_chunk_path(Path *path);

DecompressChunkPath *copy_decompress_chunk_path(DecompressChunkPath *src);
64 changes: 42 additions & 22 deletions tsl/test/expected/cagg_query.out
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,35 @@ select * from mat_m1 order by sumh, sumt, minl, timec ;

:EXPLAIN
select * from regview order by timec desc;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Output: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec)), (min(_hyper_1_1_chunk.location)), (sum(_hyper_1_1_chunk.temperature)), (sum(_hyper_1_1_chunk.humidity))
Sort Key: (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec)) DESC
-> GroupAggregate
-> Finalize GroupAggregate
Output: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec)), min(_hyper_1_1_chunk.location), sum(_hyper_1_1_chunk.temperature), sum(_hyper_1_1_chunk.humidity)
Group Key: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec))
-> Sort
Output: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec)), _hyper_1_1_chunk.temperature, _hyper_1_1_chunk.humidity
Output: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec)), (PARTIAL min(_hyper_1_1_chunk.location)), (PARTIAL sum(_hyper_1_1_chunk.temperature)), (PARTIAL sum(_hyper_1_1_chunk.humidity))
Sort Key: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec))
-> Result
Output: _hyper_1_1_chunk.location, time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec), _hyper_1_1_chunk.temperature, _hyper_1_1_chunk.humidity
-> Append
-> Seq Scan on _timescaledb_internal._hyper_1_1_chunk
Output: _hyper_1_1_chunk.location, _hyper_1_1_chunk.timec, _hyper_1_1_chunk.temperature, _hyper_1_1_chunk.humidity
-> Seq Scan on _timescaledb_internal._hyper_1_2_chunk
Output: _hyper_1_2_chunk.location, _hyper_1_2_chunk.timec, _hyper_1_2_chunk.temperature, _hyper_1_2_chunk.humidity
(16 rows)
-> Append
-> Partial GroupAggregate
Output: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec)), PARTIAL min(_hyper_1_1_chunk.location), PARTIAL sum(_hyper_1_1_chunk.temperature), PARTIAL sum(_hyper_1_1_chunk.humidity)
Group Key: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec))
-> Sort
Output: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec)), _hyper_1_1_chunk.temperature, _hyper_1_1_chunk.humidity
Sort Key: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec))
-> Seq Scan on _timescaledb_internal._hyper_1_1_chunk
Output: _hyper_1_1_chunk.location, time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec), _hyper_1_1_chunk.temperature, _hyper_1_1_chunk.humidity
-> Partial GroupAggregate
Output: _hyper_1_2_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_2_chunk.timec)), PARTIAL min(_hyper_1_2_chunk.location), PARTIAL sum(_hyper_1_2_chunk.temperature), PARTIAL sum(_hyper_1_2_chunk.humidity)
Group Key: _hyper_1_2_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_2_chunk.timec))
-> Sort
Output: _hyper_1_2_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_2_chunk.timec)), _hyper_1_2_chunk.temperature, _hyper_1_2_chunk.humidity
Sort Key: _hyper_1_2_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_2_chunk.timec))
-> Seq Scan on _timescaledb_internal._hyper_1_2_chunk
Output: _hyper_1_2_chunk.location, time_bucket('@ 1 day'::interval, _hyper_1_2_chunk.timec), _hyper_1_2_chunk.temperature, _hyper_1_2_chunk.humidity
(26 rows)

-- PUSHDOWN cases --
-- all group by elts in order by , reorder group by elts to match
Expand Down Expand Up @@ -446,19 +456,29 @@ select * from mat_m1, regview where mat_m1.timec > '2018-10-01' and mat_m1.timec
-> Hash Join
Output: _hyper_2_4_chunk.location, _hyper_2_4_chunk.timec, _hyper_2_4_chunk.minl, _hyper_2_4_chunk.sumt, _hyper_2_4_chunk.sumh, _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec)), (min(_hyper_1_1_chunk.location)), (sum(_hyper_1_1_chunk.temperature)), (sum(_hyper_1_1_chunk.humidity))
Hash Cond: ((time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec)) = _hyper_2_4_chunk.timec)
-> GroupAggregate
-> Finalize GroupAggregate
Output: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec)), min(_hyper_1_1_chunk.location), sum(_hyper_1_1_chunk.temperature), sum(_hyper_1_1_chunk.humidity)
Group Key: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec))
-> Sort
Output: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec)), _hyper_1_1_chunk.temperature, _hyper_1_1_chunk.humidity
Output: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec)), (PARTIAL min(_hyper_1_1_chunk.location)), (PARTIAL sum(_hyper_1_1_chunk.temperature)), (PARTIAL sum(_hyper_1_1_chunk.humidity))
Sort Key: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec))
-> Result
Output: _hyper_1_1_chunk.location, time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec), _hyper_1_1_chunk.temperature, _hyper_1_1_chunk.humidity
-> Append
-> Seq Scan on _timescaledb_internal._hyper_1_1_chunk
Output: _hyper_1_1_chunk.location, _hyper_1_1_chunk.timec, _hyper_1_1_chunk.temperature, _hyper_1_1_chunk.humidity
-> Seq Scan on _timescaledb_internal._hyper_1_2_chunk
Output: _hyper_1_2_chunk.location, _hyper_1_2_chunk.timec, _hyper_1_2_chunk.temperature, _hyper_1_2_chunk.humidity
-> Append
-> Partial GroupAggregate
Output: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec)), PARTIAL min(_hyper_1_1_chunk.location), PARTIAL sum(_hyper_1_1_chunk.temperature), PARTIAL sum(_hyper_1_1_chunk.humidity)
Group Key: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec))
-> Sort
Output: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec)), _hyper_1_1_chunk.temperature, _hyper_1_1_chunk.humidity
Sort Key: _hyper_1_1_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec))
-> Seq Scan on _timescaledb_internal._hyper_1_1_chunk
Output: _hyper_1_1_chunk.location, time_bucket('@ 1 day'::interval, _hyper_1_1_chunk.timec), _hyper_1_1_chunk.temperature, _hyper_1_1_chunk.humidity
-> Partial GroupAggregate
Output: _hyper_1_2_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_2_chunk.timec)), PARTIAL min(_hyper_1_2_chunk.location), PARTIAL sum(_hyper_1_2_chunk.temperature), PARTIAL sum(_hyper_1_2_chunk.humidity)
Group Key: _hyper_1_2_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_2_chunk.timec))
-> Sort
Output: _hyper_1_2_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_2_chunk.timec)), _hyper_1_2_chunk.temperature, _hyper_1_2_chunk.humidity
Sort Key: _hyper_1_2_chunk.location, (time_bucket('@ 1 day'::interval, _hyper_1_2_chunk.timec))
-> Seq Scan on _timescaledb_internal._hyper_1_2_chunk
Output: _hyper_1_2_chunk.location, time_bucket('@ 1 day'::interval, _hyper_1_2_chunk.timec), _hyper_1_2_chunk.temperature, _hyper_1_2_chunk.humidity
-> Hash
Output: _hyper_2_4_chunk.location, _hyper_2_4_chunk.timec, _hyper_2_4_chunk.minl, _hyper_2_4_chunk.sumt, _hyper_2_4_chunk.sumh
-> Append
Expand All @@ -477,7 +497,7 @@ select * from mat_m1, regview where mat_m1.timec > '2018-10-01' and mat_m1.timec
Output: _hyper_1_2_chunk_1.location, _hyper_1_2_chunk_1.timec, _hyper_1_2_chunk_1.temperature, _hyper_1_2_chunk_1.humidity
Index Cond: ((_hyper_1_2_chunk_1.timec >= 'Sat Nov 03 17:00:00 2018 PDT'::timestamp with time zone) AND (_hyper_1_2_chunk_1.timec > 'Mon Oct 01 00:00:00 2018 PDT'::timestamp with time zone))
Filter: (time_bucket('@ 1 day'::interval, _hyper_1_2_chunk_1.timec) > 'Mon Oct 01 00:00:00 2018 PDT'::timestamp with time zone)
(37 rows)
(47 rows)

select l.locid, mat_m1.* from mat_m1 , location_tab l where timec > '2018-10-01' and l.locname = mat_m1.location order by timec desc;
locid | location | timec | minl | sumt | sumh
Expand Down
Loading

0 comments on commit d2a2498

Please sign in to comment.