From 14c2c1d6e8cfec4a53521eab5c2b6fb011b76c31 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Mon, 25 Nov 2024 16:17:36 +0100 Subject: [PATCH] Fix variable resolution in vectorized aggregation planning (#7415) We didn't properly resolve INDEX_VARs in the output targetlist of DecompressChunk nodes, which are present when it uses a custom scan targetlist. Fix this by always working with the targetlist where these variables are resolved to uncompressed chunk variables, like we do during execution. --- .unreleased/resolve-vars | 2 + tsl/src/nodes/vector_agg/exec.c | 17 +- tsl/src/nodes/vector_agg/plan.c | 156 +++++++++++++------ tsl/test/expected/vector_agg_param.out | 87 +++++++++-- tsl/test/expected/vectorized_aggregation.out | 7 + tsl/test/sql/vector_agg_param.sql | 15 +- tsl/test/sql/vectorized_aggregation.sql | 4 + 7 files changed, 220 insertions(+), 68 deletions(-) create mode 100644 .unreleased/resolve-vars diff --git a/.unreleased/resolve-vars b/.unreleased/resolve-vars new file mode 100644 index 00000000000..51f42a33713 --- /dev/null +++ b/.unreleased/resolve-vars @@ -0,0 +1,2 @@ +Fixes: #7410 "aggregated compressed column not found" error on aggregation query. +Thanks: @uasiddiqi for reporting the "aggregated compressed column not found" error. diff --git a/tsl/src/nodes/vector_agg/exec.c b/tsl/src/nodes/vector_agg/exec.c index 2da6e21c063..1d773ce1c4e 100644 --- a/tsl/src/nodes/vector_agg/exec.c +++ b/tsl/src/nodes/vector_agg/exec.c @@ -25,16 +25,19 @@ get_input_offset(DecompressChunkState *decompress_state, Var *var) { DecompressContext *dcontext = &decompress_state->decompress_context; + /* + * All variable references in the vectorized aggregation node were + * translated to uncompressed chunk variables when it was created. + */ + CustomScan *cscan = castNode(CustomScan, decompress_state->csstate.ss.ps.plan); + Ensure((Index) var->varno == (Index) cscan->scan.scanrelid, + "got vector varno %d expected %d", + var->varno, + cscan->scan.scanrelid); + CompressionColumnDescription *value_column_description = NULL; for (int i = 0; i < dcontext->num_data_columns; i++) { - /* - * See the column lookup in compute_plain_qual() for the discussion of - * which attribute numbers occur where. At the moment here it is - * uncompressed_scan_attno, but it might be an oversight of not rewriting - * the references into INDEX_VAR (or OUTER_VAR...?) when we create the - * VectorAgg node. - */ CompressionColumnDescription *current_column = &dcontext->compressed_chunk_columns[i]; if (current_column->uncompressed_chunk_attno == var->varattno) { diff --git a/tsl/src/nodes/vector_agg/plan.c b/tsl/src/nodes/vector_agg/plan.c index 2d24cad5a7f..c8c204b5b8d 100644 --- a/tsl/src/nodes/vector_agg/plan.c +++ b/tsl/src/nodes/vector_agg/plan.c @@ -74,29 +74,44 @@ resolve_outer_special_vars_mutator(Node *node, void *context) return expression_tree_mutator(node, resolve_outer_special_vars_mutator, context); } - Var *aggregated_var = castNode(Var, node); - Ensure(aggregated_var->varno == OUTER_VAR, - "encountered unexpected varno %d as an aggregate argument", - aggregated_var->varno); - + Var *var = castNode(Var, node); CustomScan *custom = castNode(CustomScan, context); - TargetEntry *decompress_chunk_tentry = - castNode(TargetEntry, list_nth(custom->scan.plan.targetlist, aggregated_var->varattno - 1)); - Var *decompressed_var = castNode(Var, decompress_chunk_tentry->expr); - if (decompressed_var->varno == INDEX_VAR) + if ((Index) var->varno == (Index) custom->scan.scanrelid) + { + /* + * This is already the uncompressed chunk var. We can see it referenced + * by expressions in the output targetlist of DecompressChunk node. + */ + return (Node *) copyObject(var); + } + + if (var->varno == OUTER_VAR) + { + /* + * Reference into the output targetlist of the DecompressChunk node. + */ + TargetEntry *decompress_chunk_tentry = + castNode(TargetEntry, list_nth(custom->scan.plan.targetlist, var->varattno - 1)); + + return resolve_outer_special_vars_mutator((Node *) decompress_chunk_tentry->expr, context); + } + + if (var->varno == INDEX_VAR) { /* * This is a reference into the custom scan targetlist, we have to resolve * it as well. */ - decompressed_var = - castNode(Var, - castNode(TargetEntry, - list_nth(custom->custom_scan_tlist, decompressed_var->varattno - 1)) - ->expr); - } - Assert(decompressed_var->varno > 0); - return (Node *) copyObject(decompressed_var); + var = castNode(Var, + castNode(TargetEntry, list_nth(custom->custom_scan_tlist, var->varattno - 1)) + ->expr); + Assert(var->varno > 0); + + return (Node *) copyObject(var); + } + + Ensure(false, "encountered unexpected varno %d as an aggregate argument", var->varno); + return node; } /* @@ -115,20 +130,20 @@ resolve_outer_special_vars(List *agg_tlist, CustomScan *custom) * node. */ static Plan * -vector_agg_plan_create(Agg *agg, CustomScan *decompress_chunk) +vector_agg_plan_create(Agg *agg, CustomScan *decompress_chunk, List *resolved_targetlist) { CustomScan *vector_agg = (CustomScan *) makeNode(CustomScan); vector_agg->custom_plans = list_make1(decompress_chunk); vector_agg->methods = &scan_methods; + vector_agg->custom_scan_tlist = resolved_targetlist; + /* * Note that this is being called from the post-planning hook, and therefore * after set_plan_refs(). The meaning of output targetlists is different from * the previous planning stages, and they contain special varnos referencing * the scan targetlists. */ - vector_agg->custom_scan_tlist = - resolve_outer_special_vars(agg->plan.targetlist, decompress_chunk); vector_agg->scan.plan.targetlist = build_trivial_custom_output_targetlist(vector_agg->custom_scan_tlist); @@ -179,44 +194,64 @@ is_vector_var(CustomScan *custom, Expr *expr, bool *out_is_segmentby) return false; } - Var *aggregated_var = castNode(Var, expr); + Var *decompressed_var = castNode(Var, expr); /* - * Check if this particular column is a segmentby or has bulk decompression - * enabled. This hook is called after set_plan_refs, and at this stage the - * output targetlist of the aggregation node uses OUTER_VAR references into - * the child scan targetlist, so first we have to translate this. + * This must be called after resolve_outer_special_vars(), so we should only + * see the uncompressed chunk variables here. */ - Assert(aggregated_var->varno == OUTER_VAR); - TargetEntry *decompressed_target_entry = - list_nth(custom->scan.plan.targetlist, AttrNumberGetAttrOffset(aggregated_var->varattno)); + Ensure((Index) decompressed_var->varno == (Index) custom->scan.scanrelid, + "expected scan varno %d got %d", + custom->scan.scanrelid, + decompressed_var->varno); - if (!IsA(decompressed_target_entry->expr, Var)) + if (decompressed_var->varattno <= 0) { - /* - * Can only aggregate the plain Vars. Not sure if this is redundant with - * the similar check above. - */ + /* Can't work with special attributes like tableoid. */ + if (out_is_segmentby) + { + *out_is_segmentby = false; + } return false; } - Var *decompressed_var = castNode(Var, decompressed_target_entry->expr); /* * Now, we have to translate the decompressed varno into the compressed * column index, to check if the column supports bulk decompression. */ List *decompression_map = list_nth(custom->custom_private, DCP_DecompressionMap); - List *is_segmentby_column = list_nth(custom->custom_private, DCP_IsSegmentbyColumn); - List *bulk_decompression_column = list_nth(custom->custom_private, DCP_BulkDecompressionColumn); int compressed_column_index = 0; for (; compressed_column_index < list_length(decompression_map); compressed_column_index++) { - if (list_nth_int(decompression_map, compressed_column_index) == decompressed_var->varattno) + const int custom_scan_attno = list_nth_int(decompression_map, compressed_column_index); + if (custom_scan_attno <= 0) + { + continue; + } + + int uncompressed_chunk_attno = 0; + if (custom->custom_scan_tlist == NIL) + { + uncompressed_chunk_attno = custom_scan_attno; + } + else + { + Var *var = castNode(Var, + castNode(TargetEntry, + list_nth(custom->custom_scan_tlist, + AttrNumberGetAttrOffset(custom_scan_attno))) + ->expr); + uncompressed_chunk_attno = var->varattno; + } + + if (uncompressed_chunk_attno == decompressed_var->varattno) { break; } } Ensure(compressed_column_index < list_length(decompression_map), "compressed column not found"); + + List *bulk_decompression_column = list_nth(custom->custom_private, DCP_BulkDecompressionColumn); Assert(list_length(decompression_map) == list_length(bulk_decompression_column)); const bool bulk_decompression_enabled_for_column = list_nth_int(bulk_decompression_column, compressed_column_index); @@ -233,6 +268,8 @@ is_vector_var(CustomScan *custom, Expr *expr, bool *out_is_segmentby) /* * Check if this column is a segmentby. */ + List *is_segmentby_column = list_nth(custom->custom_private, DCP_IsSegmentbyColumn); + Assert(list_length(is_segmentby_column) == list_length(decompression_map)); const bool is_segmentby = list_nth_int(is_segmentby_column, compressed_column_index); if (out_is_segmentby) { @@ -317,7 +354,7 @@ can_vectorize_aggref(Aggref *aggref, CustomScan *custom) * Currently supports either no grouping or grouping by segmentby columns. */ static bool -can_vectorize_grouping(Agg *agg, CustomScan *custom) +can_vectorize_grouping(Agg *agg, CustomScan *custom, List *resolved_targetlist) { if (agg->numCols == 0) { @@ -327,7 +364,7 @@ can_vectorize_grouping(Agg *agg, CustomScan *custom) for (int i = 0; i < agg->numCols; i++) { int offset = AttrNumberGetAttrOffset(agg->grpColIdx[i]); - TargetEntry *entry = list_nth(agg->plan.targetlist, offset); + TargetEntry *entry = list_nth_node(TargetEntry, resolved_targetlist, offset); bool is_segmentby = false; if (!is_vector_var(custom, entry->expr, &is_segmentby)) @@ -519,25 +556,48 @@ try_insert_vector_agg_node(Plan *plan) return plan; } - if (!can_vectorize_grouping(agg, custom)) + /* + * To make it easier to examine the variables participating in the aggregation, + * the subsequent checks are performed on the aggregated targetlist with + * all variables resolved to uncompressed chunk variables. + */ + List *resolved_targetlist = resolve_outer_special_vars(agg->plan.targetlist, custom); + + if (!can_vectorize_grouping(agg, custom, resolved_targetlist)) { /* No GROUP BY support for now. */ return plan; } - /* Now check the aggregate functions themselves. */ + /* Now check the output targetlist. */ ListCell *lc; - foreach (lc, agg->plan.targetlist) + foreach (lc, resolved_targetlist) { TargetEntry *target_entry = castNode(TargetEntry, lfirst(lc)); - if (!IsA(target_entry->expr, Aggref)) + if (IsA(target_entry->expr, Aggref)) { - continue; + Aggref *aggref = castNode(Aggref, target_entry->expr); + if (!can_vectorize_aggref(aggref, custom)) + { + /* Aggregate function not vectorizable. */ + return plan; + } } - - Aggref *aggref = castNode(Aggref, target_entry->expr); - if (!can_vectorize_aggref(aggref, custom)) + else if (IsA(target_entry->expr, Var)) + { + if (!is_vector_var(custom, target_entry->expr, NULL)) + { + /* Variable not vectorizable. */ + return plan; + } + } + else { + /* + * Sometimes the plan can require this node to perform a projection, + * e.g. we can see a nested loop param in its output targetlist. We + * can't handle this case currently. + */ return plan; } } @@ -546,5 +606,5 @@ try_insert_vector_agg_node(Plan *plan) * Finally, all requirements are satisfied and we can vectorize this partial * aggregation node. */ - return vector_agg_plan_create(agg, custom); + return vector_agg_plan_create(agg, custom, resolved_targetlist); } diff --git a/tsl/test/expected/vector_agg_param.out b/tsl/test/expected/vector_agg_param.out index b481d9c8a97..3d717b10d2a 100644 --- a/tsl/test/expected/vector_agg_param.out +++ b/tsl/test/expected/vector_agg_param.out @@ -21,23 +21,42 @@ select count(compress_chunk(x)) from show_chunks('pvagg') x; (1 row) analyze pvagg; -explain (costs off) +-- The reference for this test is generated using the standard Postgres +-- aggregation. When you change this test, recheck the results against the +-- Postgres aggregation by uncommenting the below GUC. +-- set timescaledb.enable_vectorized_aggregation to off; +explain (verbose, costs off) select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx; - QUERY PLAN ---------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Nested Loop - -> Function Scan on unnest x + Output: x.x, (sum(pvagg.a)) + -> Function Scan on pg_catalog.unnest x + Output: x.x + Function Call: unnest('{0,1,2}'::integer[]) -> Finalize Aggregate - -> Custom Scan (ChunkAppend) on pvagg + Output: sum(pvagg.a) + -> Custom Scan (ChunkAppend) on public.pvagg + Output: (PARTIAL sum(pvagg.a)) + Startup Exclusion: false + Runtime Exclusion: true -> Custom Scan (VectorAgg) - -> Custom Scan (DecompressChunk) on _hyper_1_1_chunk - -> Seq Scan on compress_hyper_2_3_chunk - Filter: (s = x.x) + Output: (PARTIAL sum(_hyper_1_1_chunk.a)) + Grouping Policy: all compressed batches + -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk + Output: _hyper_1_1_chunk.a + -> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk + Output: compress_hyper_2_3_chunk._ts_meta_count, compress_hyper_2_3_chunk.s, compress_hyper_2_3_chunk._ts_meta_min_1, compress_hyper_2_3_chunk._ts_meta_max_1, compress_hyper_2_3_chunk.a + Filter: (compress_hyper_2_3_chunk.s = x.x) -> Custom Scan (VectorAgg) - -> Custom Scan (DecompressChunk) on _hyper_1_2_chunk - -> Seq Scan on compress_hyper_2_4_chunk - Filter: (s = x.x) -(12 rows) + Output: (PARTIAL sum(_hyper_1_2_chunk.a)) + Grouping Policy: all compressed batches + -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk + Output: _hyper_1_2_chunk.a + -> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk + Output: compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk.s, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1, compress_hyper_2_4_chunk.a + Filter: (compress_hyper_2_4_chunk.s = x.x) +(27 rows) select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx; x | sum @@ -47,4 +66,48 @@ select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg 2 | 1498500 (3 rows) +explain (verbose, costs off) +select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from pvagg) xx; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Nested Loop + Output: x.x, (sum((_hyper_1_1_chunk.a + x.x))) + -> Function Scan on pg_catalog.unnest x + Output: x.x + Function Call: unnest('{0,1,2}'::integer[]) + -> Finalize Aggregate + Output: sum((_hyper_1_1_chunk.a + x.x)) + -> Append + -> Partial Aggregate + Output: PARTIAL sum((_hyper_1_1_chunk.a + x.x)) + -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk + Output: _hyper_1_1_chunk.a + -> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk + Output: compress_hyper_2_3_chunk._ts_meta_count, compress_hyper_2_3_chunk.s, compress_hyper_2_3_chunk._ts_meta_min_1, compress_hyper_2_3_chunk._ts_meta_max_1, compress_hyper_2_3_chunk.a + -> Partial Aggregate + Output: PARTIAL sum((_hyper_1_2_chunk.a + x.x)) + -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk + Output: _hyper_1_2_chunk.a + -> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk + Output: compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk.s, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1, compress_hyper_2_4_chunk.a +(20 rows) + +select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from pvagg) xx; + x | sum +---+--------- + 0 | 1998000 + 1 | 1999998 + 2 | 2001996 +(3 rows) + +-- The plan for this query differs after PG16, x is not used as grouping key but +-- just added into the output targetlist of partial aggregation nodes. +select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg group by x) xx; + x | sum +---+--------- + 0 | 1998000 + 1 | 1998000 + 2 | 1998000 +(3 rows) + drop table pvagg; diff --git a/tsl/test/expected/vectorized_aggregation.out b/tsl/test/expected/vectorized_aggregation.out index 54a712d8c12..63ff6ed337c 100644 --- a/tsl/test/expected/vectorized_aggregation.out +++ b/tsl/test/expected/vectorized_aggregation.out @@ -3496,3 +3496,10 @@ SELECT sum(segment_by_value1) FROM testtable2 WHERE segment_by_value1 > 1000 AND (84 rows) RESET max_parallel_workers_per_gather; +-- Can't group by a system column +SELECT sum(float_value) FROM testtable2 GROUP BY tableoid ORDER BY 1 LIMIT 1; + sum +------- + 82620 +(1 row) + diff --git a/tsl/test/sql/vector_agg_param.sql b/tsl/test/sql/vector_agg_param.sql index 491a877556d..d695b839376 100644 --- a/tsl/test/sql/vector_agg_param.sql +++ b/tsl/test/sql/vector_agg_param.sql @@ -18,11 +18,24 @@ select count(compress_chunk(x)) from show_chunks('pvagg') x; analyze pvagg; +-- The reference for this test is generated using the standard Postgres +-- aggregation. When you change this test, recheck the results against the +-- Postgres aggregation by uncommenting the below GUC. +-- set timescaledb.enable_vectorized_aggregation to off; -explain (costs off) +explain (verbose, costs off) select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx; select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx; +explain (verbose, costs off) +select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from pvagg) xx; + +select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from pvagg) xx; + +-- The plan for this query differs after PG16, x is not used as grouping key but +-- just added into the output targetlist of partial aggregation nodes. +select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg group by x) xx; + drop table pvagg; diff --git a/tsl/test/sql/vectorized_aggregation.sql b/tsl/test/sql/vectorized_aggregation.sql index c8844932a93..bafecd6b544 100644 --- a/tsl/test/sql/vectorized_aggregation.sql +++ b/tsl/test/sql/vectorized_aggregation.sql @@ -403,3 +403,7 @@ SET max_parallel_workers_per_gather = 0; SELECT sum(segment_by_value1) FROM testtable2 WHERE segment_by_value1 > 1000 AND int_value > 1000; RESET max_parallel_workers_per_gather; + + +-- Can't group by a system column +SELECT sum(float_value) FROM testtable2 GROUP BY tableoid ORDER BY 1 LIMIT 1;